使用 PySpark 处理数据笔记
调用 Spark 能力处理数据
调用 Master 几种模式:
local
local[K]
local[*]
spark://host:port
yarn
sparkSession >unfolded 1
2
3
4
5
6
7# 调用 Standalone 模式下的集群能力,同时设置了 worker 的内存使用配置
spark = SparkSession \
.builder \
.appName("C3Analysis") \
.master("spark://hn-manager:7077") \
.config("spark.executor.memory", "13G") \
.getOrCreate()
在读取数据时,分为本地数据读取和 HDFS 数据读取。需要本地读取数据那么需要通过
file://file_path
的方式,可以直接添加目录,读取目录中的数据hdfs 管理,使用 HDFS 管理文件目录时,需要注意使用相关的用户权限
环境变量添加与修改 在使用 PySpark 功能时,存在需要修改环境变量
PPYSPARK_PYTHON
和PYSPARK_DRIVER_PYTHON
为临时环境值。该问题的解决方案是修改os
package 中的environ
实例:1
2
3import os
os.environ["PYSPARK_PYTHON"]="/data/Anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/data/Anaconda3/bin/python"PySpark 退出服务 在运行完 PySpark 程序之后,需要关闭 Spark 的话需要使用
SparkContext
的stop()
方法关闭或者使用System.exit(0)
命令或者sys.exit()
方法退出应用SparkDataFrame 增加行索引 使用
F.monotonically_increasing_id()
方法可以增加行索引,但是得到的索引值并不是显性排序的——得到的结果的索引和是否为单机、数据是否分为了多个parlition
等原因相关。命令方式如下:data = data.select(F.monotonically_increasing_id().alias("index"), "*")
使用定制化的 Java 方法 思路上使用 Py4J 实现 Python 中数据和 JVM 通信的方式,而在 PySpark 已经支持了 JVM。因此重要的节点上实现 Java 中的类被加载在 PySpark shell中,实现的方法是:
- 在启动 PySpark 阶段需要添加上包含了类的 JAR 包,eg:
pyspark --master local --driver-class-path <jar 包路径>
命令可以在单机模式下将添加 jar 包(在 Spark 2.4.0 版本中使用--jars
参数方式替换--driver-class-path
也是可行的) - 以 SparkContext 对象的 jvm 调用相关 Java 类方法,例如命令
sc._jvm.com.python.test.TestDem0001.concatStatic('1', 'a', 'b')
是表示调用com.python.test
的 package 下TestDem0001
类的concatStatic
方法
- 在启动 PySpark 阶段需要添加上包含了类的 JAR 包,eg:
PySpark 中的 SparkDataFrame 字段名称是没有区分大小写的,例如
df.select("ITV_accounT").head(3)
和df.select("itv_account").head(3)
都可以得到结果